01 Moving Average (MA) Model#

Goals#

  • Define the MA(\(q\)) model (and how it differs from a rolling mean).

  • Explain the generative “algorithm” and the math behind it.

  • Understand assumptions, requirements, and invertibility.

  • Implement MA simulation/inversion in NumPy.

  • Visualize the effect of order, impulse response, and residual autocorrelation.

Prerequisites#

  • Basic probability (expectation, variance, covariance)

  • Time series basics (lags, autocorrelation)

  • Python: NumPy + Plotly

Model definition (MA(\(q\)))#

An MA(\(q\)) model is a finite linear filter of white-noise shocks. It is not the same thing as the rolling/simple moving average used for smoothing.

Time-domain form#

[ X_t = \mu + \varepsilon_t + \theta_1 \varepsilon_{t-1} + \cdots + \theta_q \varepsilon_{t-q}, \qquad \varepsilon_t \sim \text{WN}(0,\sigma^2). ]

Convenient shorthand: set (\theta_0 = 1) so [ X_t - \mu = \sum_{i=0}^q \theta_i \varepsilon_{t-i}. ]

Backshift-operator form#

Let (B X_t = X_{t-1}). Then [ X_t - \mu = \Theta(B),\varepsilon_t, \qquad \Theta(B) = 1 + \theta_1 B + \cdots + \theta_q B^q. ]

What the “algorithm” is#

1) Generating (simulating) an MA(\(q\))#

For each time step (t):

  1. Draw a new shock (\varepsilon_t).

  2. Output (X_t) as a weighted sum of the current and last (q) shocks.

In other words, MA(\(q\)) is an FIR (finite impulse response) filter applied to white noise.

2) Using MA(\(q\)) as a statistical model (Box–Jenkins view)#

  • Identify (q): the ACF often shows a sharp cutoff after lag (q) (heuristic).

  • Estimate parameters: typically via (Gaussian) maximum likelihood / innovations algorithms.

  • Diagnose fit: residuals (estimated shocks) should look like white noise; residual ACF should be near 0.

  • Forecast: beyond (q) steps, the optimal forecast tends back to (\mu) because future shocks have mean 0.

How it works mathematically (step-by-step)#

Write the centered process (Y_t = X_t - \mu = \sum_{i=0}^q \theta_i \varepsilon_{t-i}).

Mean#

[ \mathbb{E}[X_t] = \mu + \sum_{i=0}^q \theta_i,\mathbb{E}[\varepsilon_{t-i}] = \mu. ]

Autocovariance#

For lag (k \ge 0): [ \begin{aligned} \gamma(k) &= \text{Cov}(Y_t, Y_{t-k})\ &= \text{Cov}\Big(\sum_{i=0}^q \theta_i \varepsilon_{t-i},\ \sum_{j=0}^q \theta_j \varepsilon_{t-k-j}\Big)\ &= \sum_{i=0}^q \sum_{j=0}^q \theta_i\theta_j,\text{Cov}(\varepsilon_{t-i}, \varepsilon_{t-k-j}). \end{aligned} ]

Because ({\varepsilon_t}) is white noise, (\text{Cov}(\varepsilon_{t-i}, \varepsilon_{t-k-j}) = \sigma^2) only when (t-i = t-k-j) (i.e. (j=i-k)), otherwise it is 0. That leaves only overlapping terms: [ \gamma(k) = \sigma^2 \sum_{i=0}^{q-k} \theta_i\theta_{i+k}, \qquad k=0,1,\dots,q. ] For (|k|>q), there is no overlap in shocks, so (\gamma(k)=0).

Autocorrelation (ACF)#

[ \rho(k) = \frac{\gamma(k)}{\gamma(0)}. ] Key takeaway: the theoretical ACF of MA(\(q\)) is exactly 0 for lags (|k|>q) (“cutoff”).

Impulse response#

If a single unit shock happens at time (t) ((\varepsilon_t=1), all other shocks 0), then [ X_{t+h} - \mu = \theta_h\ \text{for } h=0,1,\dots,q,\quad \text{and } 0 \text{ thereafter}, ] where (\theta_0=1). So MA models have finite shock memory.

How and when it is used (and what it models)#

MA(\(q\)) is used when dependence comes from short-lived disturbances rather than long-run persistence.

Typical scenarios / incident and noise patterns:

  • Measurement noise with short carryover (sensor “ringing”, smoothing/filtering artifacts).

  • One-off incidents that affect a few subsequent observations (temporary outage recovery, delayed reporting).

  • Aggregation / batching effects (value at time (t) mixes several recent random contributions).

  • Microstructure noise in finance (e.g., bid–ask bounce creates short-range negative autocorrelation).

Practical modeling:

  • Standalone MA models are common for short-memory series.

  • More often, MA terms appear inside ARMA/ARIMA/SARIMA models to capture short-run shock structure.

Model assumptions and requirements#

Core assumptions (for the MA part):

  • Innovations (\varepsilon_t) have zero mean, constant variance (\sigma^2), and are uncorrelated over time (often assumed i.i.d.).

  • Parameters (\mu,\theta_1,\dots,\theta_q) are constant over the sample.

  • The relationship is linear and time-invariant.

Notes:

  • Stationarity: any finite MA(\(q\)) with finite-variance shocks is (weakly) stationary; no restrictions on (\theta) are needed for stationarity.

  • For likelihood-based estimation and standard errors, people often assume Gaussian shocks; without Gaussianity, Gaussian MLE is still a common quasi-MLE.

  • Estimation typically needs a sample size meaningfully larger than (q), and diagnostics rely on having enough data to see residual autocorrelation.

Invertibility#

MA parameters are not always uniquely identified from second-order moments unless we impose invertibility.

Define the MA polynomial [ \Theta(z) = 1 + \theta_1 z + \cdots + \theta_q z^q. ] The MA(\(q\)) is invertible if all roots of (\Theta(z)) lie outside the unit circle: [ |z_i| > 1 \quad \text{for every root } z_i. ]

Why it matters:

  • It guarantees a stable inverse filter, so shocks can be recovered from data: (\varepsilon_t = \Theta(B)^{-1}(X_t-\mu)), an AR(\(\infty\)) expansion with absolutely summable coefficients.

  • It makes the MA representation unique (no parameter aliases).

MA(1) intuition#

For MA(1): (X_t-\mu = (1+\theta B)\varepsilon_t). If (|\theta|<1), we can expand the inverse as a convergent series: [ (1+\theta B)^{-1} = 1 - \theta B + \theta^2 B^2 - \theta^3 B^3 + \cdots. ] If (|\theta|\ge 1), this inverse does not converge, and computing shocks from observations becomes unstable.

Intuition behind the error terms (innovations)#

The shocks (\varepsilon_t) represent new information that was not predictable from the past. An MA model says: the observation today is the current shock plus echoes of a few recent shocks.

Abstract examples:

  • Echo / ringing: a calibration event perturbs the next few readings.

  • Delayed reporting: one random reporting error spills into a couple of subsequent periods.

  • Batching: what you record at (t) includes several recent random contributions.

import numpy as np

import plotly.graph_objects as go
import os
import plotly.io as pio
from plotly.subplots import make_subplots

pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
np.set_printoptions(precision=4, suppress=True)

rng = np.random.default_rng(42)
def ma_filter(eps, theta, mu=0.0):
    eps = np.asarray(eps, dtype=float)
    theta = np.asarray(theta, dtype=float)

    q = theta.size
    n = eps.size
    x = np.empty(n, dtype=float)

    for t in range(n):
        value = eps[t]
        for k in range(1, q + 1):
            if t - k >= 0:
                value += theta[k - 1] * eps[t - k]
        x[t] = mu + value

    return x


def simulate_ma(
    theta,
    n,
    mu=0.0,
    sigma=1.0,
    rng=None,
    burnin=200,
):
    if rng is None:
        rng = np.random.default_rng()

    eps = rng.normal(0.0, sigma, size=n + burnin)
    x = ma_filter(eps, theta, mu=mu)
    return x[burnin:], eps[burnin:]


def innovations_from_ma(x, theta, mu=0.0):
    x = np.asarray(x, dtype=float)
    theta = np.asarray(theta, dtype=float)

    q = theta.size
    n = x.size
    eps = np.zeros(n, dtype=float)

    for t in range(n):
        value = x[t] - mu
        for k in range(1, q + 1):
            if t - k >= 0:
                value -= theta[k - 1] * eps[t - k]
        eps[t] = value

    return eps


def sample_acf(x, max_lag):
    x = np.asarray(x, dtype=float)
    x = x - x.mean()

    denom = float(np.dot(x, x))
    if denom == 0.0:
        return np.r_[1.0, np.zeros(max_lag)]

    acf = np.empty(max_lag + 1, dtype=float)
    acf[0] = 1.0
    for k in range(1, max_lag + 1):
        acf[k] = float(np.dot(x[k:], x[:-k]) / denom)
    return acf


def theoretical_acf_ma(theta, max_lag):
    theta = np.asarray(theta, dtype=float)
    q = theta.size

    theta_full = np.r_[1.0, theta]
    gamma = np.zeros(max_lag + 1, dtype=float)
    for k in range(0, max_lag + 1):
        if k <= q:
            gamma[k] = float(np.sum(theta_full[: q + 1 - k] * theta_full[k:]))

    return gamma / gamma[0]


def is_invertible(theta, tol=1e-12):
    theta = np.asarray(theta, dtype=float)
    if theta.size == 0:
        return True

    coeffs = np.r_[theta[::-1], 1.0]
    roots = np.roots(coeffs)
    return bool(np.all(np.abs(roots) > 1 + tol))


def fit_ma1_via_acf(x):
    x = np.asarray(x, dtype=float)

    mu_hat = float(x.mean())
    x0 = x - mu_hat

    r1 = float(sample_acf(x0, 1)[1])
    r1 = float(np.clip(r1, -0.499, 0.499))

    if abs(r1) < 1e-12:
        theta_hat = 0.0
    else:
        disc = max(0.0, 1.0 - 4.0 * r1 * r1)
        s = disc ** 0.5
        theta_a = (1.0 + s) / (2.0 * r1)
        theta_b = (1.0 - s) / (2.0 * r1)
        theta_hat = min([theta_a, theta_b], key=lambda v: abs(v))
        theta_hat = float(np.clip(theta_hat, -0.99, 0.99))

    var_hat = float(np.mean(x0 * x0))
    sigma2_hat = var_hat / (1.0 + theta_hat * theta_hat)
    sigma_hat = float(sigma2_hat ** 0.5)

    return mu_hat, np.array([theta_hat]), sigma_hat


def fit_ma_via_acf_random_search(
    x,
    q,
    max_lag=20,
    n_candidates=20000,
    seed=0,
):
    x = np.asarray(x, dtype=float)

    mu_hat = float(x.mean())
    x0 = x - mu_hat
    target = sample_acf(x0, max_lag)

    rng_local = np.random.default_rng(seed)
    best_theta = None
    best_loss = np.inf

    for _ in range(n_candidates):
        cand = rng_local.uniform(-0.99, 0.99, size=q)
        if not is_invertible(cand):
            continue
        theo = theoretical_acf_ma(cand, max_lag)
        loss = float(np.mean((target[1:] - theo[1:]) ** 2))
        if loss < best_loss:
            best_loss = loss
            best_theta = cand

    if best_theta is None:
        best_theta = np.zeros(q, dtype=float)

    var_hat = float(np.mean(x0 * x0))
    sigma2_hat = var_hat / float(np.sum(np.r_[1.0, best_theta] ** 2))
    sigma_hat = float(sigma2_hat ** 0.5)

    return mu_hat, best_theta, sigma_hat
orders = {
    "MA(0)": np.array([]),
    "MA(1)": np.array([0.8]),
    "MA(2)": np.array([0.8, 0.3]),
    "MA(5)": np.array([0.8, 0.3, 0.1, 0.05, 0.02]),
}

burnin = 50
n = 250
max_lag = 20

eps = rng.normal(0.0, 1.0, size=n + burnin)

fig = make_subplots(
    rows=2,
    cols=len(orders),
    column_titles=[
        f"{name} (invertible={is_invertible(theta)})" for name, theta in orders.items()
    ],
    row_titles=["sample path", "sample ACF"],
    vertical_spacing=0.15,
)

for col, (name, theta) in enumerate(orders.items(), start=1):
    x = ma_filter(eps, theta)[burnin:]

    fig.add_trace(
        go.Scatter(y=x, mode="lines", name=name, showlegend=False),
        row=1,
        col=col,
    )

    acf = sample_acf(x, max_lag)
    fig.add_trace(
        go.Bar(x=np.arange(max_lag + 1), y=acf, showlegend=False),
        row=2,
        col=col,
    )

    fig.update_xaxes(title_text="t", row=1, col=col)
    fig.update_xaxes(title_text="lag", row=2, col=col)
    fig.update_yaxes(title_text="X_t", row=1, col=col)
    fig.update_yaxes(title_text="ACF", row=2, col=col, range=[-1, 1])

fig.update_layout(
    height=650,
    width=1150,
    title_text="Effect of MA order: sample paths and ACF cutoff",
)
fig.show()
theta = orders["MA(5)"]
impulse = np.r_[1.0, theta]
lags = np.arange(impulse.size)

fig = go.Figure(go.Bar(x=lags, y=impulse))
fig.update_layout(
    title="MA impulse response (finite-memory filter coefficients)",
    xaxis_title="lag h",
    yaxis_title="response weight (θ_h, with θ_0=1)",
)
fig.show()
theta_true = np.array([0.8, 0.3])
x, _eps_true = simulate_ma(theta_true, n=1000, sigma=1.0, rng=rng, burnin=200)

mu_1, theta_1, sigma_1 = fit_ma1_via_acf(x)
mu_2, theta_2, sigma_2 = fit_ma_via_acf_random_search(
    x, q=2, max_lag=20, n_candidates=25000, seed=123
)

resid_1 = innovations_from_ma(x, theta_1, mu=mu_1)
resid_2 = innovations_from_ma(x, theta_2, mu=mu_2)

max_lag = 20
acf_resid_1 = sample_acf(resid_1, max_lag)
acf_resid_2 = sample_acf(resid_2, max_lag)

conf = 1.96 / np.sqrt(x.size)
lags = np.arange(1, max_lag + 1)

fig = make_subplots(
    rows=1,
    cols=2,
    column_titles=["Residual ACF after fitting MA(1)", "Residual ACF after fitting MA(2)"],
)

for col, acf_resid in enumerate([acf_resid_1, acf_resid_2], start=1):
    fig.add_trace(go.Bar(x=lags, y=acf_resid[1:], showlegend=False), row=1, col=col)
    fig.add_trace(
        go.Scatter(
            x=[lags[0], lags[-1]],
            y=[conf, conf],
            mode="lines",
            line=dict(color="crimson", dash="dash"),
            showlegend=False,
        ),
        row=1,
        col=col,
    )
    fig.add_trace(
        go.Scatter(
            x=[lags[0], lags[-1]],
            y=[-conf, -conf],
            mode="lines",
            line=dict(color="crimson", dash="dash"),
            showlegend=False,
        ),
        row=1,
        col=col,
    )

    fig.update_xaxes(title_text="lag", row=1, col=col)
    fig.update_yaxes(title_text="ACF", row=1, col=col, range=[-1, 1])

fig.update_layout(
    height=350,
    width=950,
    title_text="Residual autocorrelation diagnostic (white-noise residuals ≈ 0)",
)
fig.show()

(mu_1, theta_1, sigma_1), (mu_2, theta_2, sigma_2)
((-0.05216475228517619, array([0.9387]), 1.0124577283931864),
 (-0.05216475228517619, array([0.8422, 0.4018]), 1.0152504473095068))

Pitfalls and diagnostics#

  • Don’t confuse MA(\(q\)) with the rolling mean used for smoothing.

  • MA(\(q\)) is always stationary, but invertibility matters for identification and stable residual computation.

  • ACF cutoff is a heuristic; always check residual autocorrelation (and, in practice, tests like Ljung–Box).

  • MA parameters can be hard to estimate in small samples; likelihood surfaces can be flat and multi-modal.